跳到主要内容

Flink 快速开始

版本支持

LakeSoulFlink
2.4.x+1.17
2.1.x-2.3.x1.14

PG配置

添加如下配置到 $FLINK_HOME/conf/flink-conf.yaml:

containerized.master.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
containerized.master.env.LAKESOUL_PG_USERNAME: root
containerized.master.env.LAKESOUL_PG_PASSWORD: root
containerized.master.env.LAKESOUL_PG_URL: jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
containerized.taskmanager.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
containerized.taskmanager.env.LAKESOUL_PG_USERNAME: root
containerized.taskmanager.env.LAKESOUL_PG_PASSWORD: root
containerized.taskmanager.env.LAKESOUL_PG_URL: jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified

注意master和taskmanager都需要设置

提示

Postgres 数据库的连接信息、用户名密码需要根据实际情况修改。

警告

注意如果使用 Session 模式来启动作业,即将作业以 client 方式提交到 Flink Standalone Cluster,则 flink run 作为 client,是不会读取上面配置,因此需要再单独配置环境变量,即:

export LAKESOUL_PG_DRIVER=com.lakesoul.shaded.org.postgresql.Driver
export LAKESOUL_PG_URL=jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
export LAKESOUL_PG_USERNAME=root
export LAKESOUL_PG_PASSWORD=root

SQL

可以在 LakeSoul Release 页面下载: https://github.com/lakesoul-io/LakeSoul/releases/download/v2.5.0/lakesoul-flink-2.5.0-flink-1.17.jar. 如果访问 Github 有问题,也可以通过这个链接下载:https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/lakesoul/lakesoul-flink-2.5.0-flink-1.17.jar

使用SQL Client

# Start Flink SQL Client
bin/sql-client.sh embedded -j lakesoul-flink-2.5.0-flink-1.17.jar

创建表

-- Create the test_table table, use id and name as the joint primary key, use region and date as the two-level range partition, catalog is lakesoul, and database is default
create table `lakesoul`.`default`.test_table (
`id` INT,
name STRING,
score INT,
`date` STRING,
region STRING,
PRIMARY KEY (`id`,`name`) NOT ENFORCED
) PARTITIONED BY (`region`,`date`)
WITH (
'connector'='lakesoul',
'hashBucketNum'='4',
'use_cdc'='true',
'path'='file:///tmp/lakesoul/flink/sink/test');
提示

建表语句中各个部分参数含义:

参数含义说明参数填写格式
PARTITIONED BY用于指定表的 Range 分区字段,如果不存在 range 分区字段,则省略PARTITIONED BY (date)
PRIMARY KEY用于指定表的主键,可以包含多个列PARIMARY KEY (id, name) NOT ENFORCED
connector数据源连接器,用于指定数据源类型'connector'='lakesoul'
hashBucketNum有主键表必须设置哈希分片数'hashBucketNum'='4'
path用于指定表的存储路径'path'='file:///tmp/lakesoul/flink/sink/test'
use_cdc设置表是否为 CDC 格式 (参考 CDC 表格式 )'use_cdc'='true'

删除表

DROP TABLE if exists test_table;

插入数据

批式:直接插入数据

```sql
insert into `lakesoul`.`default`.test_table values (1,'AAA', 98, '2023-05-10', 'China');
```

流式:构建流式任务,从另一个流式数据源中读取数据并写入到 LakeSoul 表中。如果上游数据是 CDC 的格式,则目标写入的 LakeSoul 表也需要设置为 CDC 表。

```sql
-- 表示将`lakesoul`.`cdcsink`.soure_table表中的全部数据,插入到lakesoul`.`default`.test_table
insert into `lakesoul`.`default`.test_table select * from `lakesoul`.`cdcsink`.soure_table;
```
警告
  1. 对流写入,需要设置 checkpoint 间隔,建议为 1 分钟以上;
  2. 根据环境设置相应的时区:
SET 'table.local-time-zone' = 'Asia/Shanghai';
-- 设置 checkpointing 时间间隔
SET 'execution.checkpointing.interval' = '2min';

更新数据

UPDATE `lakesoul`.`default`.test_table set score = 100 where id = 1;
警告

注意 update 情况下,不允许更新主键、分区列的值。 对于流的执行模式,LakeSoul 已经能够支持 ChangeLog 语义,可以支持增删改。

Delete Data

DELETE FROM `lakesoul`.`default`.test_table where id = 1;
警告

delete 情况下,不允许条件中带有分区列。 对于流的执行模式,LakeSoul 已经能够支持 ChangeLog 语义,可以支持增删改。

查询数据

全量读

SELECT * FROM `lakesoul`.`default`.test_table where region='China' and `date`='2023-05-10';

快照读

LakeSoul 支持对表执行快照读取,用户通过指定分区信息和结束时间戳,可以查询结束时间戳之前的所有数据。

-- Execute snapshot read of test_table in the region=China partition, the end timestamp of the read is 2023-05-01 15:20:15, and the time zone is Asia/Shanghai
SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='snapshot', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China';

增量范围读

LakeSoul 支持对表执行范围增量读取,用户通过指定分区信息和起始时间戳、结束时间戳,可以查询这一时间范围内的增量数据。

```sql
-- Incremental reading of test_table in the region=China partition, the read timestamp range is 2023-05-01 15:15:15 to 2023-05-01 15:20:15, and the time zone is Asia/Shanghai
SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='incremental', 'readstarttime'='2023-05-01 15:15:15 ', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China';
```

流读

LakeSoul 表支持在 Flink 执行流式读取,流式读基于增量读,用户通过指定起始时间戳和分区信息,可以连续不间断读取自起始时间戳以后的新增数据。若是不设置起始时间戳则从第一条数据读

-- Incremental reading of test_table in the region=China partition, the time zone is Asia/Shanghai
SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('timezone'='Asia/Shanghai')*/ WHERE region='China';

在流式读取时,LakeSoul 完整支持 Flink Changelog Stream 语义。对于 LakeSoul CDC 表,增量读取的结果仍然为 CDC 格式,即包含了 insertupdatedelete 事件,这些事件会自动转为 Flink RowData 的 RowKind 字段的对应值,从而在 Flink 中实现了全链路的增量计算。

Lookup Join

LakeSoul 表支持 Flink SQL 中的 Lookup Join 操作。Lookup Join 会将待 Join 的右表缓存在内存中,从而大幅提升 Join 速度,可以在较小维表关联的场景中使用以提升性能。 LakeSoul 默认每隔 60 秒会尝试刷新缓存,这个间隔可以通过在创建维表时设置 'lookup.join.cache.ttl'='60s' 表属性来修改。

CREATE TABLE `lakesoul`.`default`.customers (
`c_id` INT,
`name` STRING,
PRIMARY KEY (`c_id`) NOT ENFORCED)
WITH (
'connector'='lakesoul',
'hashBucketNum'='1',
'path'='file:///tmp/lakesoul/flink/sink/customers'
);
CREATE TABLE `lakesoul`.`default`.orders (
`o_id` INT,
`o_c_id` INT,
PRIMARY KEY (`o_id`) NOT ENFORCED)
WITH (
'connector'='lakesoul',
'hashBucketNum'='1',
'path'='file:///tmp/lakesoul/flink/sink/orders',
'lookup.join.cache.ttl'='60s'
);
SELECT `o_id`, `c_id`, `name`
FROM
(SELECT *, proctime() as proctime FROM `lakesoul`.`default`.orders) as o
JOIN `lakesoul`.`default`.customers FOR SYSTEM_TIME AS OF o.proctime
ON c_id = o_cid;

Orders 表需要与 Customers 表的数据进行 Lookup Join。带有后续 process time 属性的 FOR SYSTEM_TIME AS OF 子句确保在联接运算符处理 Orders 行时,Orders 的每一行都与 join 条件匹配的 Customer 行连接。它还防止连接的 Customer 表在未来发生更新时变更连接结果。lookup join 还需要一个强制的相等连接条件,在上面的示例中是 o_c_id = c_id

提示

支持Flink按批式和流式读取lakesoul表,在Flink SQLClient客户端执行命令,切换流式和批式的执行模式。

-- 按照流式执行Flink任务
SET execution.runtime-mode = streaming;
SET 'execution.checkpointing.interval' = '1min';
-- 按照批式执行Flink任务
SET execution.runtime-mode = batch;

使用 Flink SQL,指定条件查询的格式为 SELECT * FROM test_table /*+ OPTIONS('key'='value')*/ WHERE partition=xxx 。在任何一种读的模式下,分区可以指定,也可以不指定,也可以只指定一部分分区值,LakeSoul 会自动匹配满足条件的分区。

其中 /* OPTIONS() */ 为查询选项(hints),必须要直接跟在表名的后面(在 where 等其他子句的前面),LakeSoul 读取时的 hint 选项包括:

参数含义说明参数填写格式
readtype读类型,可以指定增量读incremental,快照读snapshot,不指定默认全量读'readtype'='incremental'
discoveryinterval流式增量读的发现新数据时间间隔,单位毫秒,默认为 30000'discoveryinterval'='10000'
readstarttime起始读时间戳,如果未指定起始时间戳,则默认从起始版本号开始读取'readstarttime'='2023-05-01 15:15:15'
readendtime结束读时间戳,如果未指定结束时间戳,则默认读取到当前最新版本号'readendtime'='2023-05-01 15:20:15'
timezone时间戳的时区信息,如果不指定时间戳的时区信息,则默认为按本机时区处理'timezone'='Asia/Sahanghai'